{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<center>\n",
    "<img src=\"../../img/ods_stickers.jpg\">\n",
    "## Открытый курс по машинному обучению, 3 сессия\n",
    "<center>Автор материала: Трофимов Артём Владимирович, @avt"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# <center>Dask: когда не справляется Pandas</center>"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Данный тьюториал содержит краткий обзор библиотеки Dask и более подробное описание возможностей dask.dataframe.\n",
    "<br>\n",
    "При подготовке тьюториала использовались данные [2017 NYC Taxi Rides](http://www.nyc.gov/html/tlc/html/about/trip_record_data.shtml)."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Что такое Dask?"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**Dask - библиотека Python для параллельных вычислений.** Работает как на одной машине, максимально используя доступные вычислительные ресурсы, так и на кластере до 1000 ядер. Однако, как заметил разработчик Dask Matthew Rocklin: \"Медианный размер кластера Dask - 1 компьютер\"."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Компоненты Dask"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "1. **Big data collections** - параллельные \"ленивые\" обёртки для датафреймов Pandas, массивов NumPy и итераторов для работы с данными, размер которых превышает объем памяти.\n",
    "2. **Dynamic task scheduling** - планировщик задач, оптимизированный для вычислений."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<center><img src=\"http://dask.pydata.org/en/latest/_images/collections-schedulers.png\" height=\"30%\" widht=\"30%\"></center>"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**В отдельные проекты выделены:**\n",
    "1. [Dask-ML](http://dask.pydata.org/en/latest/machine-learning.html) - оптимизированные алгоритмы sklearn, dask-xgboost (!), dask-tensorflow (!) и про \"это ваше машинное обучение\" в масштабах кластера. \n",
    "2. [Dask-distributed](https://distributed.readthedocs.io/en/latest/) - про dask на распределенном кластере"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### К теме тьюториала: использование dask.dataframe"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Dask.dataframe - это распределенный pandas.DataFrame. Если Dask.dataframe не помещается в память, то в RAM последовательно подгружаются соответствующие объему памяти части, а \"излишки\" хранятся на диске."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Какие проблемы pandas решает dask.dataframe?"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**Проблема №1**: данные должны помещаться в память\n",
    "<br>\n",
    "**Решение Dask**: работает с данными, которые не умещаются в память\n",
    "<br><br>\n",
    "**Проблема №2**: вычисления в 1 поток\n",
    "<br>\n",
    "**Решение Dask**: автоматическая параллелизация"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<img src=\"http://dask.pydata.org/en/latest/_images/dask-dataframe.svg\" height=\"20%\" width=\"20%\">"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**Интерфейс dask.dataframe аналогичен pandas:**"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "```\n",
    "#pandas                                 #dask\n",
    "import pandas as pd                     import dask.dataframe as dd\n",
    "df = pd.read_csv('2015-01-01.csv')      df = dd.read_csv('2015-*-*.csv')\n",
    "df.groupby(df.user_id).value.mean()     df.groupby(df.user_id).value.mean().compute()\n",
    "```"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Эксперименты"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import gc\n",
    "import glob\n",
    "\n",
    "import dask\n",
    "import dask.dataframe as dd\n",
    "import numpy as np\n",
    "import pandas as pd"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**Имеется 2 файла:**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!ls data/*.csv"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**Считывать будем только 4 столбца:** `VendorID`, `tpep_pickup_datetime`, `passenger_count`, `total_amount`\n",
    "<br>\n",
    "Этих данных достаточно для демонстрации возможностей Dask <s>да и комп у меня слабый</s> :trollface:\n",
    "<br><br>\n",
    "Параметры для считывания файлов:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "params = dict(header=0, \n",
    "              usecols = [0, 1, 3, 16],\n",
    "              dtype = {'1': 'datetime64'},\n",
    "              #небольшой костыль для корректного считывания данных\n",
    "              converters = {'Passenger_count': (lambda x: round(float(x), 0) // 1 if (x != 'NaN' or len(x) <= 5) else 0), \n",
    "                            'Total_amount': (lambda x: float(x) if (x != 'NaN' or len(x) <= 5) else 0)}\n",
    "             )"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Читаем 1 файл"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**pandas**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "pandas_df = pd.read_csv('data/yellow_tripdata_2017-12.csv', **params)\n",
    "pandas_df.head()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**dask**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "dask_df = dd.read_csv('data/yellow_tripdata_2017-12.csv', **params)\n",
    "dask_df.head()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    ">Dask справился значительно быстрее, потому что pandas сначала считывает файл и выводит первые 5, а dask считывает 5 строк и сразу их выводит."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Однако, когда файл помещается в оперативную память, pandas с уже загруженными данными серьезно превосходит dask, работающий по \"ленивому\" принципу - вычисления и обработка данных происходят непосредственно при вызове метода. Реализация \"ленивого\" подхода, в принципе, характерна для ресурсоемких операций. Особенно, когда дело касается \"настоящей бигдаты\"."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**Следим за использованием памяти, удаляем ненужные объекты, собираем мусор:**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "del pandas_df, dask_df\n",
    "gc.collect()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Загружаем 2 файла"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**dask**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "dask_df2 = dd.read_csv('data/*.csv', **params)\n",
    "dask_df2.head()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**pandas**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "pandas_df2 = pd.concat([pd.read_csv(fn, **params) for fn in glob.glob('data/*.csv')])\n",
    "pandas_df2.head()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    ">Учитывая, что загружаемые файлы примерно одинакового размера (~800 Mb), видим, что время обработки увеличилось линейно. Очевидно, если грузить реально большой файл(-ы), pandas рано или поздно упрётся в лимит RAM."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### OK - памяти хватает, но считает медленно..."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Для устранения этого неудобства можно просто преобразовать pandas.DataFrame в dask.datafram и считать всеми имеющимися ядрами. Автоматически, без дополнительного кода и настроек."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**Используем pandas_df2 из предыдущего примера:**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "dask_df3 = dd.from_pandas(pandas_df2, npartitions=2, chunksize=None)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**pandas'овский датафрейм просто переопределим для нумерации датафреймов:**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "pandas_df3 = pandas_df2"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**Уборка:**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "del pandas_df2\n",
    "gc.collect()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Speed-test: dask VS. pandas"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Рассмотрим несколько примеров, наглядно демонстрирующих: с помощью dask можно значительно ускорить обработку данных.\n",
    "<br><br>\n",
    "Обратите внимание на метод `compute()` при обработке dask датафрейма - это как раз команда \"посчитать\". Без нее \"ленивый\" dask лишь определит, что нужно будет сделать непосредственно при запросе пользователя."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### 1. max()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "pandas_df3['total_amount'].max()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "dask_df3['total_amount'].max().compute()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### 2. value_counts()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "pandas_df3['passenger_count'].value_counts()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "pandas_df3['passenger_count'].value_counts()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### 3. groupby() - sum()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "pandas_df3.groupby(by='VendorID')['passenger_count'].sum()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "dask_df3.groupby(by='VendorID')['passenger_count'].sum().compute()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    ">Очевидно, dask, автоматически используя доступные ресурсы, работает быстрее pandas даже при простых операциях."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Dask.dataframe API является частью Pandas API, но не является его полной копией - следует знать о некоторых ограничениях, например:\n",
    "1. Операции, связанные с индексированием (новый индекс) несортированных данных, затратны с вычислительной точки зрения\n",
    "2. Посторочная обработка работает медленно как в pandas, так и в dask"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Выводы"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Dask - простой и мощный инструмент для чтения больших файлов и обработки данных. Использвание dask.dataframe позволяет максимально использовать ресурсы компьютера без дополнительного кода и настроек."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**dask.dataframe <font color=\"green\">рекомендуется</font> использовать, когда:**\n",
    "1. Необходимо считать и обработать данные, не помещающиеся в память\n",
    "2. Конфигурация компьютера позволяет задействовать в вычислениях несколько ядер процессора\n",
    "3. Распределенная обработка больших датасетов с помощью стандартных инструмнтов Pandas"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**Использование dask.dataframe <font color=\"red\">не рекомендуется</font>, когда:**\n",
    "1. Данные помещаются в память - pandas может справляться быстрее\n",
    "2. Данные не соответствуют табличному формату pandas\n",
    "3. Необходимо использование функционала, не реализованного в dask.dataframe API"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Источники информации:"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "1. Очень крутая и подробная [документация Dask](http://dask.pydata.org/en/latest/docs.html)\n",
    "2. Презентация Matthew Rocklin, разработчика Dask - [Dask: Parallel Programming in Python](http://matthewrocklin.com/slides/dask-short.html)\n",
    "3. Материалы митапа \"Машинное обучение в Новосибирске\" - [Дмитрий Колодезев о Dask](https://www.youtube.com/watch?time_continue=193&v=emd2NOC05es)"
   ]
  }
 ],
 "metadata": {
  "anaconda-cloud": {},
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.5.2"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 1
}
